Fork me on GitHub

【Java多线程】JUC锁 05. ReentrantLock

ReentrantLock

1. 前言

  • ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。
  • ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。
  • ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁
  • 最多支持同一线程的2147483647个递归锁,超过将抛出Error
  • ReentrantLocksynchronized 区别:

    • ReentrantLock 非阻塞,synchronized 阻塞
    • ReentrantLock 获取锁时候可以中断,可以设置超时时间

    • ReentrantLock 灵活性更高,允许获取多个锁,以不同顺序释放锁;允许在不同范围释放锁

    • 都具备线程重入特性;ReentrantLock 表现为API层面的互斥锁, synchronized 表现为原生语法层面的互斥锁

    • ReentrantLock 具备以下高级功能:
      • 等待可中断
      • 可实现公平锁:必须按照申请锁的时间顺序来依次获得锁
      • 锁可以绑定多个条件

2. 源码解析

2.1 数据结构

  • ReentrantLock与sync是组合关系。ReentrantLock中,包含了Sync对象,Sync是AQS的子类;
  • Sync有两个子类FairSync(公平锁)和NonFairSync(非公平锁)。ReentrantLock是一个独占锁,它是公平锁还是非公平锁由Sync对象实例决定。

2.2函数列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ReentrantLock implements Lock, java.io.Serializable {

private final Sync sync; //内部类,AQS子类,基于AQS实现
//构造方法,默认是“非公平锁”
public ReentrantLock() {
sync = new NonfairSync();
}
//fair为true表示是公平锁,fair为false表示是非公平锁。
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

// 查询当前线程保持此锁的次数。
int getHoldCount()
// 返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。
protected Thread getOwner()
// 返回一个 collection,它包含可能正等待获取此锁的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正等待获取此锁的线程估计数。
int getQueueLength()
// 返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。
protected Collection<Thread> getWaitingThreads(Condition condition)
// 返回等待与此锁相关的给定条件的线程估计数。
int getWaitQueueLength(Condition condition)
// 查询给定线程是否正在等待获取此锁。
boolean hasQueuedThread(Thread thread)
// 查询是否有些线程正在等待获取此锁。
boolean hasQueuedThreads()
// 查询是否有些线程正在等待与此锁有关的给定条件。
boolean hasWaiters(Condition condition)
// 如果是“公平锁”返回true,否则返回false。
boolean isFair()
// 查询当前线程是否保持此锁。
boolean isHeldByCurrentThread()
// 查询此锁是否由任意线程保持。
boolean isLocked()
// 获取锁。
void lock()
// 如果当前线程未被中断,则获取锁。
void lockInterruptibly()
// 返回用来与此 Lock 实例一起使用的 Condition 实例。
Condition newCondition()
// 仅在调用时锁未被另一个线程保持的情况下,才获取该锁。
boolean tryLock()
// 如果锁在给定等待时间内没有被另一个线程保持,且当前线程未被中断,则获取该锁。
boolean tryLock(long timeout, TimeUnit unit)
// 试图释放此锁。
void unlock()
}

2.3 公平锁和非公平锁

2.3.1 公平锁

  • lock()

公平锁在FairSync类实现:

1
2
3
final void lock() {
acquire(1); //设置状态,锁可获取时候,为0;锁被线程获取,状态值为1;可重入锁,状态值state+1
}

acquire 在AQS中实现。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) && //获取锁成功直接返回,失败则进入等待队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//独占模式
selfInterrupt();//自己产生一个中断。在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!
}

过程如下:

  • 首先通过tryAcquire()尝试获取锁,获取成功直接返回;
  • 当前线程tryAcquire()获取失败,通过addWaiter(Node.EXCLUSIVE)方法将当前线程添加入CLH等待队列末尾
  • 加入CLH队列后,通过acquireQueued()方法获取锁。“当前线程”在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回!如果“当前线程”在休眠等待过程中被中断过,acquireQueued()会返回true,此时”当前线程”会调用selfInterrupt()来自己给自己产生一个中断
  1. tryAcquire方法在FairSync实现,尝试获取锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//尝试获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //可获取锁,锁没有被任何线程拥有
if (!hasQueuedPredecessors() && //当前线程是CLH队列中第一个线程或者CLH为空
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
//CLH队列为空或者队首为当前线程返回false
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
  1. addWaiter加入CLH等待队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//AQS中实现,加入等待队列尾节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); //自旋加入节点
return node;
}
  1. acquireQueued获取锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//AQS中实现
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//p == head 公平性原则,线程可能由于中断而唤醒影响公平,需要当前线程被其他线程unpark
if (p == head && tryAcquire(arg)) { //当前继节点是CLH队列的头节点,并且它释放锁之后;就轮到当前节点获取锁了。然后,当前节点通过tryAcquire()获取锁;获取成功的话,通过setHead(node)设置当前节点为头节点,并返回。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //判断“当前线程是否应该阻塞”
parkAndCheckInterrupt()) //阻塞线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 判断“当前线程是否应该阻塞”
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点’的前继节点”。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
  • unlock()
1
2
3
public void unlock() {
sync.release(1); //每次释放,锁的状态 -1
}

release释放锁在AQS中实现

1
2
3
4
5
6
7
8
9
10
//尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。否则,直接返回false。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
//Sync中实现,尝试释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {// 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//唤醒当前线程的后继线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) //设置状态为0, 可获取锁
compareAndSetWaitStatus(node, ws, 0);
//获取当前节点的“有效的后继节点”,无效的话,则通过for循环进行获取。
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

2.3.2 非公平锁

非公平锁和公平锁主要在获取锁上有差别。

公平锁在尝试获取锁时,即使“锁”没有被任何线程锁持有,它也会判断自己是不是CLH等待队列的表头;是的话,才获取锁。
而非公平锁在尝试获取锁时,如果“锁”没有被任何线程持有,则不管它在CLH队列的何处,它都直接获取锁。

  • lock()
1
2
3
4
5
6
7
//NonfairSync类中实现,会先通过CAS直接获取锁;失败调用acquire(1)获取锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
  • tryAcquire()
1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//Sync类中实现,非公平尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {//不需要判断在CLH首节点
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3496147.html

http://www.cnblogs.com/skywang12345/p/3496147.html